

package com.hazelcast.jet.impl.processor;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.memory.AccumulationLimitExceededException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.PriorityQueue;

public class SortP<T> extends AbstractProcessor {

    private final PriorityQueue<T> priorityQueue;
    private final Traverser<T> resultTraverser;

    private long maxItems;

    public SortP(@Nullable Comparator<T> comparator) {
        this.priorityQueue = new PriorityQueue<>(comparator);
        this.resultTraverser = priorityQueue::poll;
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        maxItems = context.maxProcessorAccumulatedRecords();
    }

    @Override
    @SuppressWarnings("unchecked")
    protected boolean tryProcess0(@Nonnull Object item) {
        if (priorityQueue.size() == maxItems) {
            throw new AccumulationLimitExceededException();
        }

        priorityQueue.add((T) item);
        return true;
    }

    @Override
    public boolean complete() {
        return emitFromTraverser(resultTraverser);
    }

    @Override
    public boolean closeIsCooperative() {
        return true;
    }
}
